#include "config.h"

#include <sys/types.h>
#include <sys/epoll.h>
#include <sys/file.h>
#include <sys/mman.h>
#include <unistd.h>
#include <string.h>
#include <stdio.h>
#include <stdint.h>
#include <sys/types.h>
#include <libgen.h>
#include <ctype.h>
#include <fcntl.h>
#include <libaio.h>
#include <sqlite3.h>
#include <errno.h>
#include <getopt.h>

#define DBG_SUBSYS S_LIBREPLICA

#include "core.h"
#include "cache.h"
#include "disk.h"
#include "sysy_lib.h"
#include "ynet_rpc.h"
#include "job_dock.h"
#include "net_global.h"
#include "diskmd.h"
#include "bh.h"
#include "tpool.h"
#include "lich_md.h"
#include "configure.h"
#include "dbg.h"
#include "replica.h"
#include "disk_maping.h"
#include "core.h"
#include "coroutine.h"
#include "fnotify.h"

#define __RAW_TAB_EXIST__ "table raw already exists"
#define __METADATA_TAB_EXIST__ "table metadata already exists"
#define __KEY_EXIST__ "column key is not unique"
#define __DISK_MD_SIZE__ (4096 * 6)

#define SQLITE3_FIELD_DISK        0x00000001
#define SQLITE3_FIELD_OFFSET      0x00000002
#define SQLITE3_FIELD_PARENT      0x00000004
#define SQLITE3_FIELD_PRIORITY    0x00000008
#define SQLITE3_FIELD_META        0x00000010
#define SQLITE3_FIELD_FINGERPRINT 0x00000020
#define SQLITE3_FIELD_WBDISK      0x00000040
#define SQLITE3_FIELD_POOL        0x00000080

#define SQLITE3_FIELD_ALL         SQLITE3_FIELD_DISK          \
                                  | SQLITE3_FIELD_OFFSET      \
                                  | SQLITE3_FIELD_PARENT      \
                                  | SQLITE3_FIELD_PRIORITY    \
                                  | SQLITE3_FIELD_META        \
                                  | SQLITE3_FIELD_FINGERPRINT \
                                  | SQLITE3_FIELD_WBDISK      \
                                  | SQLITE3_FIELD_POOL

#define SQLITE3_INDEX(cols, row, col) ((row + 1) * cols + col)
#define SQLITE3_RESULT(result, cols, row, col) result[SQLITE3_INDEX(cols, row, col)]

typedef struct {
        char home[MAX_PATH_LEN];
        sqlite3 *db[__DB_HASH__];
} disk_mappint_t;

static disk_mappint_t *__disk__ = NULL;

typedef struct {
        int idx;
        sem_t sem;
        sy_spinlock_t lock;
        struct list_head list;
} disk_sqlite3_t;

typedef struct {
        struct list_head hook;
        int op;
        int ret;
        task_t *task;

        char *pool;
        uint32_t count;
        const chkid_t *chkid;
        diskloc_t *loc;
        chkid_t *parent;

        int *exists;

        int *priority;
        int *wbdisk;
        uint64_t *meta_version;
        uint64_t *fingerprint;

        int flag;
} entry_t;

static disk_sqlite3_t  *__disk_sqlite3__[__DB_HASH__];
static const char *__raw__ = "raw";
static const char *__metadata__ = "metadata";

int disk_sqlite3_vacuum_init();

static inline char* __sqlite3_get_result(char **result, int rows, int cols, int row, int col)
{
        YASSERT(row < rows);
        YASSERT(col < cols);
        return SQLITE3_RESULT(result, cols, row, col);
}

const char * __disk_table_type(const chkid_t *chkid)
{
        if (chkid->type == __RAW_CHUNK__) {
                return __raw__;
        } else {
                return __metadata__;
        }
}

static inline int __disk_db_hash(const chkid_t *chkid)
{
        return __DISK_DB_HASH__(chkid);
}

static void __disk_exist(const diskloc_t *loc)
{
        int ret, row, column, i;
        char sql[MAX_BUF_LEN];
        char *zErrMsg = NULL, **result;

#if 1
        return;
#endif

        for (i = 0; i < __DB_HASH__; i++) {
                snprintf(sql, MAX_BUF_LEN, "select key from metadata where disk=%u and offset=%u",
                         loc->diskid, loc->idx);

                ret = sqlite3_get_table(__disk__->db[i], sql, &result, &row, &column, &zErrMsg);
                if (ret != SQLITE_OK ){
                        DERROR("SQL error: %s\n", zErrMsg);
                        UNIMPLEMENTED(__DUMP__);
                }

                YASSERT(row == 0);
                sqlite3_free(zErrMsg);
                sqlite3_free_table(result);

                snprintf(sql, MAX_BUF_LEN, "select key from raw where disk=%u and offset=%u",
                         loc->diskid, loc->idx);

                ret = sqlite3_get_table(__disk__->db[i], sql, &result, &row, &column, &zErrMsg);
                if (ret != SQLITE_OK ){
                        DERROR("SQL error: %s\n", zErrMsg);
                        UNIMPLEMENTED(__DUMP__);
                }

                YASSERT(row == 0);
                sqlite3_free(zErrMsg);
                sqlite3_free_table(result);
        }
}

static void __disk_set_pragma(int idx, const char *key, const char *value)
{
        int ret;
        char sql[MAX_BUF_LEN];
        char *zErrMsg = NULL;

        snprintf(sql, MAX_BUF_LEN, "PRAGMA %s = %s", key, value);
        ret = sqlite3_exec(__disk__->db[idx], sql, NULL, 0, &zErrMsg);
        if (ret != SQLITE_OK ){
                DERROR("SQL error: %s\n", zErrMsg);
                SERROR(0, "%s SQL error: %s\n", M_DISK_META_ERROR, zErrMsg);
                UNIMPLEMENTED(__DUMP__);
        }

        DBUG("set disk %d key %s value %s\n", idx, key, value);

        sqlite3_free(zErrMsg);
}

static void __disk_get_pragma(int idx, const char *key, char *value)
{
        int ret, row, column;
        char sql[MAX_BUF_LEN];
        char *zErrMsg = NULL, **result;

        snprintf(sql, MAX_BUF_LEN, "PRAGMA %s", key);
        ret = sqlite3_get_table(__disk__->db[idx], sql, &result, &row, &column, &zErrMsg);
        if (ret != SQLITE_OK ){
                DERROR("SQL error: %s\n", zErrMsg);
                SERROR(0, "%s SQL error: %s\n", M_DISK_META_ERROR, zErrMsg);
                UNIMPLEMENTED(__DUMP__);
        }

        DBUG("get disk %d key %s value %s\n", idx, key, __sqlite3_get_result(result, row, column, 0, 0));

        if (value) {
                strcpy(value, __sqlite3_get_result(result, row, column, 0, 0));
        }

        sqlite3_free(zErrMsg);
        sqlite3_free_table(result);
}

static BOOL __disk_try_set_pragma(int idx, const char *key, const char *_value)
{
        char value[MAX_BUF_LEN];

        __disk_set_pragma(idx, key, _value);
        __disk_get_pragma(idx, key, value);

        if (0 == strcasecmp(_value, value))
                return TRUE;
        else
                return FALSE;
}

static void __disk_pragma(int idx)
{
        //sqlite3.6 and before: unsupport WAL mod
        if (!__disk_try_set_pragma(idx, "main.journal_mode", "WAL")) {
                DWARN("disk %d set key main.journal_mode value %s fail\n", idx, "WAL");
                if (!__disk_try_set_pragma(idx, "main.journal_mode", "PERSIST")) {
                        DERROR("disk %d key main.journal_mode value %s\n", idx, "PERSIST");
                        SERROR(0, "%s, disk %d key main.journal_mode value %s\n", M_DISK_IO_ERROR, idx, "PERSIST");
                        UNIMPLEMENTED(__DUMP__);
                }
        }
        __disk_try_set_pragma(idx, "main.synchronous", "FULL");
}

static int __disk_init(const char *home, int idx)
{
        int ret;
        char path[MAX_PATH_LEN], sql[MAX_BUF_LEN];
        char *zErrMsg = NULL;

        DBUG("init with sqlite3 support\n");

        snprintf(path, MAX_PATH_LEN, "%s/%d.db", home, idx);
        ret = sqlite3_open(path, &__disk__->db[idx]);
        if (ret ){
                ret = EIO;
                DERROR("disk[%d] IOError\n", idx);
                SERROR(0, "%s disk[%d] IOError\n", M_DISK_IO_ERROR, idx);
                GOTO(err_ret, ret);
        }

        snprintf(sql, MAX_BUF_LEN, "create table raw (key text primary key,"
                 " disk integer, offset integer, parent text, priority integer,"
                 " meta_version integer, fingerprint integer, wbdisk integer, pool text)");
        ret = sqlite3_exec(__disk__->db[idx], sql, NULL, 0, &zErrMsg);
        if (ret != SQLITE_OK ) {
                if (strcmp(zErrMsg, __RAW_TAB_EXIST__)) {
                        DERROR("SQL error: %s\n", zErrMsg);
                        SERROR(0, "%s SQL error: %s\n", M_DISK_META_ERROR, zErrMsg);
                        UNIMPLEMENTED(__DUMP__);
                } else {
                        DBUG("table chunk exist\n");
                }
        }

        sqlite3_free(zErrMsg);

        snprintf(sql, MAX_BUF_LEN, "create index if not exists IdxRawParent on raw(parent)");
        ret = sqlite3_exec(__disk__->db[idx], sql, NULL, 0, &zErrMsg);
        if (ret != SQLITE_OK) {
                DERROR("SQL error: %s\n", zErrMsg);
                SERROR(0, "%s SQL error: %s\n", M_DISK_META_ERROR, zErrMsg);
                UNIMPLEMENTED(__DUMP__);
        }

        sqlite3_free(zErrMsg);

        snprintf(sql, MAX_BUF_LEN, "create index if not exists IdxRawDisk on raw(disk)");
        ret = sqlite3_exec(__disk__->db[idx], sql, NULL, 0, &zErrMsg);
        if (ret != SQLITE_OK) {
                DERROR("SQL error: %s\n", zErrMsg);
                SERROR(0, "%s SQL error: %s\n", M_DISK_META_ERROR, zErrMsg);
                UNIMPLEMENTED(__DUMP__);
        }

        sqlite3_free(zErrMsg);

        snprintf(sql, MAX_BUF_LEN, "create table metadata (key text primary key,"
                 " disk integer, offset integer, parent text, priority integer,"
                 " meta_version integer, fingerprint integer, wbdisk integer, pool text)");
        ret = sqlite3_exec(__disk__->db[idx], sql, NULL, 0, &zErrMsg);
        if (ret != SQLITE_OK) {
                if (strcmp(zErrMsg, __METADATA_TAB_EXIST__)) {
                        DERROR("SQL error: %s\n", zErrMsg);
                        SERROR(0, "%s SQL error: %s\n", M_DISK_META_ERROR, zErrMsg);
                        UNIMPLEMENTED(__DUMP__);
                } else {
                        DBUG("table chunk exist\n");
                }
        }

        sqlite3_free(zErrMsg);

        __disk_pragma(idx);

        return 0;
err_ret:
        return ret;
}

/*===============call list start===============*/
static int __disk_sqlite3_setparent(va_list ap)
{
        int ret;
        const chkid_t *chkid = va_arg(ap, const chkid_t *);
        const chkid_t *parent = va_arg(ap, const chkid_t *);
        char key[MAX_BUF_LEN], _parent[MAX_BUF_LEN], sql[MAX_BUF_LEN];
        char *zErrMsg = NULL;

        va_end(ap);

        base64_encode((void *)chkid, sizeof(*chkid), key);
        base64_encode((void *)parent, sizeof(*parent), _parent);

        snprintf(sql, MAX_BUF_LEN, "update %s set parent='%s' where key='%s'",
                 __disk_table_type(chkid),  _parent, key);

        DBUG("update "CHKID_FORMAT" parent "CHKID_FORMAT" %s\n",
              CHKID_ARG(chkid), CHKID_ARG(parent), sql);

        ret = sqlite3_exec(__disk__->db[__disk_db_hash(chkid)], sql, NULL, 0, &zErrMsg);
        if (ret != SQLITE_OK ){
                DERROR("SQL error: %s\n", zErrMsg);
                SERROR(0, "%s SQL error: %s\n", M_DISK_META_ERROR, zErrMsg);
                UNIMPLEMENTED(__DUMP__);
        }

        sqlite3_free(zErrMsg);

        return 0;
}

int disk_sqlite3_setparent(const chkid_t *chkid, const chkid_t *parent)
{
        return schedule_newthread(SCHE_THREAD_SQLITE, __disk_db_hash(chkid), TRUE, __FUNCTION__, -1, __disk_sqlite3_setparent,
                        chkid, parent);
}

static int __disk_sqlite3_setloc(va_list ap)
{
        int ret;
        const chkid_t *chkid = va_arg(ap, const chkid_t *);
        const diskloc_t *loc = va_arg(ap, const diskloc_t *);
        char key[MAX_BUF_LEN], sql[MAX_BUF_LEN];
        char *zErrMsg = NULL;

        va_end(ap);

        __disk_exist(loc);

        base64_encode((void *)chkid, sizeof(*chkid), key);

        snprintf(sql, MAX_BUF_LEN, "update %s set disk=%u,offset=%llu,wbdisk=%d where key='%s'",
                 __disk_table_type(chkid),  loc->diskid, (LLU)loc->idx, DISK_NONE_IDX, key);

        DBUG("chunk "CHKID_FORMAT" sql %s\n", CHKID_ARG(chkid), sql);

        ret = sqlite3_exec(__disk__->db[__disk_db_hash(chkid)], sql, NULL, 0, &zErrMsg);
        if (unlikely(ret != SQLITE_OK)){
                DERROR("SQL error: %s\n", zErrMsg);
                SERROR(0, "%s SQL error: %s\n", M_DISK_META_ERROR, zErrMsg);
                UNIMPLEMENTED(__DUMP__);
        }

        sqlite3_free(zErrMsg);

        return 0;
}

int disk_sqlite3_setloc(const chkid_t *chkid, const diskloc_t *loc)
{
        return schedule_newthread(SCHE_THREAD_SQLITE, __disk_db_hash(chkid), TRUE, __FUNCTION__, -1, __disk_sqlite3_setloc,
                        chkid, loc);
}

static int __disk_sqlite3_getmetaversion(va_list ap)
{
        int ret, row, column;
        const chkid_t *chkid = va_arg(ap, const chkid_t *);
        uint64_t *meta_version = va_arg(ap, uint64_t *);
        char key[MAX_BUF_LEN], sql[MAX_BUF_LEN];
        char *zErrMsg = NULL, **result;

        va_end(ap);

        base64_encode((void *)chkid, sizeof(*chkid), key);

        snprintf(sql, MAX_BUF_LEN, "select meta_version from %s where key='%s'",
                 __disk_table_type(chkid), key);

        DBUG("chunk "CHKID_FORMAT" sql %s\n", CHKID_ARG(chkid), sql);

        ret = sqlite3_get_table(__disk__->db[__disk_db_hash(chkid)], sql, &result, &row, &column, &zErrMsg);
        if (ret != SQLITE_OK ){
                DERROR("SQL error: %s\n", zErrMsg);
                SERROR(0, "%s SQL error: %s\n", M_DISK_META_ERROR, zErrMsg);
                UNIMPLEMENTED(__DUMP__);
        }

        if (row == 0) {
                ret = ENOENT;
                GOTO(err_ret, ret);
        }

        YASSERT(row == 1);

        ret = sscanf(__sqlite3_get_result(result, row, column, 0, 0), "%lu", meta_version);
        YASSERT(ret == 1);

        sqlite3_free(zErrMsg);
        sqlite3_free_table(result);

        return 0;
err_ret:
        sqlite3_free(zErrMsg);
        sqlite3_free_table(result);
        return ret;
}

int disk_sqlite3_getmetaversion(const chkid_t *chkid, uint64_t *meta_version)
{
        return schedule_newthread(SCHE_THREAD_SQLITE, __disk_db_hash(chkid), TRUE, __FUNCTION__, -1, __disk_sqlite3_getmetaversion,
                        chkid, meta_version);
}

static int __disk_sqlite3_setmetaversion(va_list ap)
{
        int ret;
        const chkid_t *chkid = va_arg(ap, const chkid_t *);
        uint64_t meta_version = va_arg(ap, uint64_t);
        uint64_t fingerprint = va_arg(ap, uint64_t);
        char key[MAX_BUF_LEN], sql[MAX_BUF_LEN];
        char *zErrMsg = NULL;

        va_end(ap);

        base64_encode((void *)chkid, sizeof(*chkid), key);

        snprintf(sql, MAX_BUF_LEN, "update %s set meta_version='%llu', fingerprint='%llu' where key='%s'",
                 __disk_table_type(chkid),  (LLU)meta_version, (LLU)fingerprint, key);

        DBUG("chunk "CHKID_FORMAT" sql %s\n", CHKID_ARG(chkid), sql);

        ret = sqlite3_exec(__disk__->db[__disk_db_hash(chkid)], sql, NULL, 0, &zErrMsg);
        if (ret != SQLITE_OK ){
                DERROR("SQL error: %s\n", zErrMsg);
                SERROR(0, "%s SQL error: %s\n", M_DISK_META_ERROR, zErrMsg);
                UNIMPLEMENTED(__DUMP__);
        }

        sqlite3_free(zErrMsg);

        return 0;
}

int disk_sqlite3_setmetaversion(const chkid_t *chkid, uint64_t meta_version)
{
        return schedule_newthread(SCHE_THREAD_SQLITE, __disk_db_hash(chkid), TRUE, __FUNCTION__, -1, __disk_sqlite3_setmetaversion,
                        chkid, meta_version, 0);
}

static int __disk_sqlite3_load(va_list ap)
{
        int ret, row, column, slen;
        const chkid_t *chkid = va_arg(ap, const chkid_t *);
        diskloc_t *loc = va_arg(ap, diskloc_t *);
        chkid_t *parent = va_arg(ap, chkid_t *);
        uint64_t *fingerprint = va_arg(ap, uint64_t *);
        int *wbdisk = va_arg(ap, int *);
        char *pool = va_arg(ap, char *);
        char key[MAX_BUF_LEN],  _parent[MAX_BUF_LEN], sql[MAX_BUF_LEN];
        char *zErrMsg = NULL, **result;
        chkid_t *tmp;

        va_end(ap);

        base64_encode((void *)chkid, sizeof(*chkid), key);

        snprintf(sql, MAX_BUF_LEN, "select * from %s where key='%s'",
                        __disk_table_type(chkid), key);

        DBUG("chunk "CHKID_FORMAT" sql %s\n", CHKID_ARG(chkid), sql);

        ret = sqlite3_get_table(__disk__->db[__disk_db_hash(chkid)], sql, &result, &row, &column, &zErrMsg);
        if (ret != SQLITE_OK ){
                DERROR("SQL error: %s\n", zErrMsg);
                SERROR(0, "%s SQL error: %s\n", M_DISK_META_ERROR, zErrMsg);
                UNIMPLEMENTED(__DUMP__);
        }

        if (row == 0) {
                ret = ENOENT;
                goto err_ret;
        }

        YASSERT(row == 1);

        ret = sscanf(__sqlite3_get_result(result, row, column, 0, 1), "%u", &loc->diskid);
        YASSERT(ret == 1);
        ret = sscanf(__sqlite3_get_result(result, row, column, 0, 2), "%u", &loc->idx);
        YASSERT(ret == 1);

        base64_decode(__sqlite3_get_result(result, row, column, 0, 3), &slen, _parent);
        tmp = (void *)_parent;
        *parent = *tmp;

        if (fingerprint) {
                ret = sscanf(__sqlite3_get_result(result, row, column, 0, 6), "%lu", fingerprint);
                YASSERT(ret == 1);
        }

        ret = sscanf(__sqlite3_get_result(result, row, column, 0, 7), "%d", wbdisk);
        YASSERT(ret == 1);

        if (pool)
                strcpy(pool, __sqlite3_get_result(result, row, column, 0, 8));

        DBUG("chunk "CHKID_FORMAT", loc (%u, %u), parent "CHKID_FORMAT", sql %s, hash %u\n",
                        CHKID_ARG(chkid), loc->diskid, loc->idx, CHKID_ARG(parent), sql, __disk_db_hash(chkid));

        sqlite3_free(zErrMsg);
        sqlite3_free_table(result);

        return 0;
err_ret:
        sqlite3_free(zErrMsg);
        sqlite3_free_table(result);
        return ret;
}

int disk_sqlite3_load(const chkid_t *chkid, diskloc_t *loc, chkid_t *parent, char *pool)
{
        int wbdisk;
        uint64_t fingerprint;
        return schedule_newthread(SCHE_THREAD_SQLITE, __disk_db_hash(chkid), TRUE, __FUNCTION__, -1, __disk_sqlite3_load,
                                  chkid, loc, parent, &fingerprint, &wbdisk, pool);
}

int __disk_sqlite3_load_request(va_list ap)
{
        int ret;
        const chkid_t *chkid = va_arg(ap, chkid_t *);
        diskloc_t *loc = va_arg(ap, diskloc_t *);
        chkid_t *parent = va_arg(ap, chkid_t *);
        char *pool = va_arg(ap, char *);

        va_end(ap);

        ret = disk_sqlite3_load(chkid, loc, parent, pool);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

int disk_sqlite3_load_request(const chkid_t *chkid, diskloc_t *loc, chkid_t *parent, char *pool)
{
        int ret;

        ret = core_request(core_hash(chkid), -1, "disk sqlite3 load", __disk_sqlite3_load_request,
                           chkid, loc, parent, pool);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

static int __disk_sqlite3_load_multi(va_list ap)
{
        int ret, row, column;
        const chkid_t *chkids = va_arg(ap, const chkid_t *);
        int chknum = va_arg(ap, int);
        int *exists = va_arg(ap, int *);
        char key[MAX_BUF_LEN],  sql[MAX_BUF_LEN];
        char *zErrMsg = NULL, **result;
        const chkid_t *chkid;

        va_end(ap);

        memset(exists, 0x0, sizeof(int) * chknum);

        for (int i=0; i < chknum; i++) {
                chkid = &chkids[i];

                base64_encode((void *)chkid, sizeof(*chkid), key);

                snprintf(sql, MAX_BUF_LEN, "select * from %s where key='%s'", __disk_table_type(chkid), key);

                DBUG("chunk "CHKID_FORMAT" sql %s\n", CHKID_ARG(chkid), sql);

                ret = sqlite3_get_table(__disk__->db[__disk_db_hash(chkid)], sql, &result, &row, &column, &zErrMsg);
                if (ret != SQLITE_OK ){
                        DERROR("SQL error: %s\n", zErrMsg);
                        SERROR(0, "%s SQL error: %s\n", M_DISK_META_ERROR, zErrMsg);
                        UNIMPLEMENTED(__DUMP__);
                }

                sqlite3_free(zErrMsg);
                sqlite3_free_table(result);

                DBUG("sql %s row %d\n", sql, row);

                if (row == 0) {
                        // ret = ENOENT;
                        // goto err_ret;
                        continue;
                }

                YASSERT(row == 1);

                exists[i] = 1;
        }

        return 0;
// err_ret:
//         sqlite3_free(zErrMsg);
//         sqlite3_free_table(result);
//         return ret;
}

int disk_sqlite3_load_multi(const chkid_t *chkids, int chknum, int *exists)
{
        return schedule_newthread(SCHE_THREAD_SQLITE, __disk_db_hash(chkids), TRUE, __FUNCTION__, -1, __disk_sqlite3_load_multi,
                        chkids, chknum, exists);
}

int __disk_sqlite3_load_request_multi(va_list ap)
{
        const chkid_t *chkids = va_arg(ap, chkid_t *);
        int chknum = va_arg(ap, int);
        int *exists = va_arg(ap, int *);

        va_end(ap);

        int ret;

        ret = disk_sqlite3_load_multi(chkids, chknum, exists);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

int disk_sqlite3_load_request_multi(const chkid_t *chkids, int chknum, int *exists)
{
        int ret;

        ret = core_request(core_hash(chkids), -1, "disk_sqlite3_load_request_multi",
                           __disk_sqlite3_load_request_multi,
                           chkids, chknum, exists);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

static int __disk_sqlite3_del(va_list ap)
{
        int i, ret, changes;
        const chkid_t *_chkid = va_arg(ap, const chkid_t *);
        uint32_t count = va_arg(ap, uint32_t);
        chkid_t *chkid;
        char key[MAX_BUF_LEN], sql[MAX_BUF_LEN];
        char *zErrMsg = NULL;

        va_end(ap);

        YASSERT(count);

        for (i=0; i<count; i++) {
                chkid = (chkid_t *)&_chkid[i];

                base64_encode((void *)chkid, sizeof(*chkid), key);

                DBUG("del "CHKID_FORMAT" %s db[%u]\n", CHKID_ARG(chkid), key, __disk_db_hash(chkid));

                snprintf(sql, MAX_BUF_LEN, "delete from %s where key='%s'", __disk_table_type(chkid), key);

                DBUG("sql %s\n", sql);

                ret = sqlite3_exec(__disk__->db[__disk_db_hash(chkid)], sql, NULL, 0, &zErrMsg);
                if (ret != SQLITE_OK ){
                        DERROR("SQL error: %s\n", zErrMsg);
                        SERROR(0, "%s SQL error: %s\n", M_DISK_META_ERROR, zErrMsg);
                        UNIMPLEMENTED(__DUMP__);
                }

                sqlite3_free(zErrMsg);

                changes = sqlite3_changes(__disk__->db[__disk_db_hash(chkid)]);
                DBUG("delete from %d.db key %s changes count %d\n", __disk_db_hash(chkid), key, changes);

                // TODO 与删除pool的过程并发执行，导致changes == 0
                YASSERT(1 == changes || 0 == changes);
        }

        return 0;
}

int disk_sqlite3_del(const chkid_t *chkid, const diskloc_t *loc, uint32_t count)
{
        (void) loc;
        return schedule_newthread(SCHE_THREAD_SQLITE, __disk_db_hash(chkid), TRUE, __FUNCTION__, -1, __disk_sqlite3_del,
                        chkid, count);
}

int __disk_sqlite3_del_request(va_list ap)
{
        int ret;
        const chkid_t *chkid = va_arg(ap, chkid_t *);
        const diskloc_t *loc = va_arg(ap, const diskloc_t *);
        uint32_t count = va_arg(ap, uint32_t);
        va_end(ap);

        ret = disk_sqlite3_del(chkid, loc, count);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

int disk_sqlite3_del_request(const chkid_t *chkid, const diskloc_t *loc, uint32_t count)
{
        int ret;

        ret = core_request(core_hash(chkid), -1, "disk sqlite3 del", __disk_sqlite3_del_request, chkid, loc, count);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

static int __disk_sqlite3_create(va_list ap)
{
        int ret;
        const char *pool = va_arg(ap, const char *);
        const chkid_t *chkids = va_arg(ap, const chkid_t *);
        const diskloc_t *locs = va_arg(ap, const diskloc_t *);
        int chknum = va_arg(ap, int);
        const chkid_t *parent = va_arg(ap, const chkid_t *);
        int priority = va_arg(ap, int);
        uint64_t meta_version = va_arg(ap, uint64_t);
        uint64_t fingerprint = va_arg(ap, uint64_t);
        int flag = va_arg(ap, int);
        const chkid_t *chkid;
        const diskloc_t *loc;
        char key[MAX_BUF_LEN], _parent[MAX_BUF_LEN], sql[MAX_BUF_LEN];
        char *zErrMsg = NULL;

        (void) flag;
        va_end(ap);

        for (int i=0; i < chknum; i++) {
                chkid = &chkids[i];
                loc = &locs[i];

                __disk_exist(loc);

                base64_encode((void *)chkid, sizeof(*chkid), key);
                //base64_encode((void *)loc, sizeof(*loc), _loc);
                base64_encode((void *)parent, sizeof(*parent), _parent);

                snprintf(sql, MAX_BUF_LEN, "insert into %s values ('%s', '%d', '%u', '%s', '%d', '%ju', '%ju', '%d', '%s')",
                         __disk_table_type(chkid), key, loc->diskid, loc->idx, _parent, priority,
                         meta_version, fingerprint, DISK_NONE_IDX, pool);

                DBUG("chunk "CHKID_FORMAT", loc (%u, %u), parent "CHKID_FORMAT" %s\n",
                     CHKID_ARG(chkid), loc->diskid, loc->idx, CHKID_ARG(parent), sql);

                ret = sqlite3_exec(__disk__->db[__disk_db_hash(chkid)], sql, NULL, 0, &zErrMsg);
                if (unlikely(ret != SQLITE_OK)){
                        if (strcmp(zErrMsg, __KEY_EXIST__) == 0) {
                                // TODO batch insert时，导致整体失败？
                                DINFO("chunk "CHKID_FORMAT" exist\n", CHKID_ARG(chkid));
                                ret = EEXIST;
                                GOTO(err_ret, ret);
                        } else {
                                DERROR("SQL error: %s\n", zErrMsg);
                                SERROR(0, "%s SQL error: %s\n", M_DISK_META_ERROR, zErrMsg);
                                UNIMPLEMENTED(__DUMP__);
                        }
                }

                sqlite3_free(zErrMsg);
        }

        return 0;
err_ret:
        sqlite3_free(zErrMsg);
        return ret;
}

int disk_sqlite3_create(const char *pool, const chkid_t *chkid, const diskloc_t *loc, int chknum,
                        const chkid_t *parent,
                        uint64_t meta_version, int flag)
{
        return schedule_newthread(SCHE_THREAD_SQLITE, __disk_db_hash(chkid), TRUE, __FUNCTION__, -1, __disk_sqlite3_create,
                        pool, chkid, loc, chknum, parent, -1, meta_version, 0, flag);
}

static int __disk_sqlite3_vacuum(va_list ap)
{
        int ret;
        const chkid_t *chkid = va_arg(ap, const chkid_t *);
        char sql[MAX_NAME_LEN];
        char *zErrMsg = NULL;
        int idx = __disk_db_hash(chkid);

        va_end(ap);

        snprintf(sql, MAX_NAME_LEN, "vacuum");

        ret = sqlite3_exec(__disk__->db[idx], sql, NULL, 0, &zErrMsg);
        if (ret != SQLITE_OK) {
                DWARN("[%d] sqlite vacuum failed, error[%d]: %s...\n",
                                idx, ret, zErrMsg);
                sqlite3_free(zErrMsg);
                goto err_ret;
        }

        sqlite3_free(zErrMsg);
        return 0;
err_ret:
        return ret;
}

int disk_sqlite3_vacuum(const chkid_t *chkid)
{
        return schedule_newthread(SCHE_THREAD_SQLITE, __disk_db_hash(chkid), FALSE, __FUNCTION__, -1, __disk_sqlite3_vacuum,
                        chkid);
}

static int __disk_sqlite3_vacuum_request(va_list ap)
{
        int ret;
        const chkid_t *chkid = va_arg(ap, chkid_t *);

        va_end(ap);

        ret = disk_sqlite3_vacuum(chkid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

int disk_sqlite3_vacuum_request(const chkid_t *chkid)
{
        int ret;

        ret = core_request(core_hash(chkid), -1, "disk sqlite3 vacuum",
                        __disk_sqlite3_vacuum_request, chkid);
        if(unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

/*===============call list end===============*/

static void __disk_sqlite3_begin_transaction__(int idx)
{
        int ret;
        char sql[MAX_BUF_LEN];
        char *zErrMsg = NULL;

        snprintf(sql, MAX_BUF_LEN, "begin transaction");
        ret = sqlite3_exec(__disk__->db[idx], sql, NULL, 0, &zErrMsg);
        if (ret != SQLITE_OK ){
                DERROR("SQL error: %s\n", zErrMsg);
                SERROR(0, "%s SQL error: %s\n", M_DISK_META_ERROR, zErrMsg);
                UNIMPLEMENTED(__DUMP__);
        }

        sqlite3_free(zErrMsg);
}

static void __disk_sqlite3_commit_transaction__(int idx)
{
        int ret;
        char sql[MAX_BUF_LEN];
        char *zErrMsg = NULL;

        snprintf(sql, MAX_BUF_LEN, "commit transaction");
        ret = sqlite3_exec(__disk__->db[idx], sql, NULL, 0, &zErrMsg);
        if (ret != SQLITE_OK ){
                DERROR("SQL error: %s\n", zErrMsg);
                SERROR(0, "%s SQL error: %s\n", M_DISK_META_ERROR, zErrMsg);
                UNIMPLEMENTED(__DUMP__);
        }

        sqlite3_free(zErrMsg);
}

struct sche_thread_ops sqlite_ops = {
        .type           = SCHE_THREAD_SQLITE,
        .begin_trans    = __disk_sqlite3_begin_transaction__,
        .commit_trans   = __disk_sqlite3_commit_transaction__,
};

int disk_sqlite3_ops_register()
{
        return sche_thread_ops_register(&sqlite_ops, sqlite_ops.type, __DB_HASH__);
}

int disk_sqlite3_init(const char *home)
{
        int ret, i;

        ret = ymalloc((void **)&__disk__, sizeof(*__disk__));
        if (unlikely(ret))
                GOTO(err_ret, ret);

        memset(__disk__, 0x0, sizeof(*__disk__));
        strcpy(__disk__->home, home);
        YASSERT(strstr(home, "data"));

        for (i = 0; i < __DB_HASH__; i++) {
                ret = __disk_init(home, i);
                if (unlikely(ret))
                        GOTO(err_free, ret);
        }

        ret = disk_sqlite3_vacuum_init();
        if (unlikely(ret))
                GOTO(err_free, ret);

        return 0;
err_free:
        yfree((void **)&__disk__);
err_ret:
        return ret;
}

static int __disk_count__(int idx, const char *type, int diskid, int disk_type, uint64_t *chunk_count)
{
        int ret, row, column;
        char sql[MAX_BUF_LEN];
        char *zErrMsg = NULL, **result;
        char *column_name;

        if (disk_type == __DISK_ROLE_SSD__) {
                column_name = "wbdisk";
        } else {
                column_name = "disk";
        }

        snprintf(sql, MAX_BUF_LEN, "select count(*) from %s where %s='%d'",
                        type, column_name, diskid);

        DBUG("disk writeback %d.db sql %s\n", idx, sql);

        ret = sqlite3_get_table(__disk__->db[idx], sql, &result, &row, &column, &zErrMsg);
        if (ret != SQLITE_OK ){
                DERROR("SQL error: %s\n", zErrMsg);
                SERROR(0, "%s SQL error: %s\n", M_DISK_META_ERROR, zErrMsg);
                UNIMPLEMENTED(__DUMP__);
        }

        if (row == 0) {
                ret = ENOENT;
                GOTO(err_ret, ret);
        }

        YASSERT(row == 1);

        ret = sscanf(__sqlite3_get_result(result, row, column, 0, 0), "%lu", chunk_count);
        YASSERT(ret == 1);

        sqlite3_free(zErrMsg);
        sqlite3_free_table(result);

        return 0;
err_ret:
        sqlite3_free(zErrMsg);
        sqlite3_free_table(result);
        return ret;
}

int disk_sqlite3_count(int diskid, int disk_type, uint64_t *chunk_count)
{
        int ret, i;
        uint64_t count = 0, each;

        for (i = 0; i < __DB_HASH__; i++) {
                ret = __disk_count__(i, "raw", diskid, disk_type, &each);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                count += each;

                ret = __disk_count__(i, "metadata", diskid, disk_type, &each);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                count += each;
        }

        *chunk_count = count;

        return 0;
err_ret:
        return ret;
}

int disk_sqlite3_chunk_count(int diskid, uint64_t *chunk_count)
{
        return disk_sqlite3_count(diskid, __DISK_ROLE_HDD__, chunk_count);
}

static void __disk_iterator(int idx, const char *table, const char *condition, func2_t func, void *arg)
{
        int ret, row, column, slen, i;
        char key[MAX_BUF_LEN], sql[MAX_BUF_LEN], *pool;
        char *zErrMsg = NULL, **result;
        chkid_t *chkid;

        if (condition)
                snprintf(sql, MAX_BUF_LEN, "select key,pool from %s where %s", table, condition);
        else
                snprintf(sql, MAX_BUF_LEN, "select key,pool from %s", table);

        ret = sqlite3_get_table(__disk__->db[idx], sql, &result, &row, &column, &zErrMsg);
        if (ret != SQLITE_OK ){
                DERROR("SQL error: %s\n", zErrMsg);
                SERROR(0, "%s SQL error: %s\n", M_DISK_META_ERROR, zErrMsg);
                UNIMPLEMENTED(__DUMP__);
        }

        for (i = 0; i < row; i++) {
                base64_decode(__sqlite3_get_result(result, row, column, i, 0), &slen, key);
                pool = __sqlite3_get_result(result, row, column, i, 1);
                chkid = (void *)key;

                DBUG("db[%u][%u] chunk "CHKID_FORMAT" code %s\n",
                      idx, i, CHKID_ARG(chkid), result[i + 1]);
                func(arg, chkid, pool);
        }

        sqlite3_free_table(result);
        sqlite3_free(zErrMsg);
}

void disk_sqlite3_iterator(const char *table, func2_t func, void *arg)
{
        int i;

        for (i = 0; i < __DB_HASH__; i++) {
                __disk_iterator(i, table, NULL, func, arg);
        }
}

static void __disk_preload(int idx, const  char *table, const char *condition, func1_t func)
{
        int ret, row, column, slen, i, fd;
        char key[MAX_BUF_LEN], sql[MAX_BUF_LEN], path[MAX_PATH_LEN];
        char *zErrMsg = NULL, **result;
        chkid_t *chkid, *array;
        void *addr;
        uint64_t size;

        DINFO("table %s[%u] preload begin\n", table, idx);
        
        if (condition)
                snprintf(sql, MAX_BUF_LEN, "select key from %s where %s", table, condition);
        else
                snprintf(sql, MAX_BUF_LEN, "select key from %s", table);

        ret = sqlite3_get_table(__disk__->db[idx], sql, &result, &row, &column, &zErrMsg);
        if (ret != SQLITE_OK ){
                DERROR("SQL error: %s\n", zErrMsg);
                SERROR(0, "%s SQL error: %s\n", M_DISK_META_ERROR, zErrMsg);
                UNIMPLEMENTED(__DUMP__);
        }

        size = sizeof(chkid_t) * row;
        
        sprintf(path, "/tmp/preload-XXXXXX");
        fd = mkstemp(path);
        ret = ftruncate(fd, size);
        unlink(path);

        addr = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_PRIVATE, fd, 0);
        YASSERT(addr);

        array = addr;
        for (i = 0; i < row; i++) {
                base64_decode(__sqlite3_get_result(result, row, column, i, 0), &slen, key);
                chkid = (void *)key;
                array[i] =  *chkid;
                
                DBUG("db[%u][%u] chunk "CHKID_FORMAT" %s\n",
                     idx, i, CHKID_ARG(chkid), result[i + 1]);
        }

        DINFO("table %s[%u] preload scan success\n", table, idx);
        DINFO("table %s[%u] preload insert begin\n", table, idx);
        
        for (i = 0; i < row; i++) {
                func(&array[i], NULL);
        }
        
        sqlite3_free_table(result);
        sqlite3_free(zErrMsg);

        munmap(addr, size);
        close(fd);

        DINFO("table %s[%u] preload success\n", table, idx);
}


//TODO
static void __disk_iterator2_func(func_va_t func, ...)
{
        va_list ap;
        va_start(ap, func);
        func(ap);
}

static void __disk_iterator2(int idx, const char *table, const char *condition, func_va_t func, void *arg)
{
        int ret, row, column, slen, i;
        uint32_t diskid, offset;
        char key[MAX_BUF_LEN], _parent[MAX_BUF_LEN], sql[MAX_BUF_LEN], pool[MAX_NAME_LEN];
        char *zErrMsg = NULL, **result;
        chkid_t *chkid, *parent;
        uint64_t meta_version;

        snprintf(sql, MAX_BUF_LEN, "select key,disk,offset,parent,meta_version,pool from %s where %s", table, condition);

        ret = sqlite3_get_table(__disk__->db[idx], sql, &result, &row, &column, &zErrMsg);
        if (ret != SQLITE_OK ){
                DERROR("SQL error: %s\n", zErrMsg);
                SERROR(0, "%s SQL error: %s\n", M_DISK_META_ERROR, zErrMsg);
                UNIMPLEMENTED(__DUMP__);
        }

        for (i = 0; i < row; i++) {
                base64_decode(__sqlite3_get_result(result, row, column, i, 0), &slen, key);
                chkid = (void *)key;

                ret = sscanf(__sqlite3_get_result(result, row, column, i, 1), "%u",&diskid);
                YASSERT(ret == 1);

                ret = sscanf(__sqlite3_get_result(result, row, column, i, 2), "%u",&offset);
                YASSERT(ret == 1);

                base64_decode(__sqlite3_get_result(result, row, column, i, 3), &slen, _parent);
                parent = (void *)_parent;

                ret = sscanf(__sqlite3_get_result(result, row, column, i, 4), "%lu",&meta_version);
                YASSERT(ret == 1);

                strcpy(pool, __sqlite3_get_result(result, row, column, i, 5));

                DBUG("db[%u][%u] chunk "CHKID_FORMAT" code %s\n",
                      idx, i, CHKID_ARG(chkid), result[i + 1]);
                __disk_iterator2_func(func, arg, idx, chkid, diskid, offset, parent, pool, meta_version);
        }

        sqlite3_free_table(result);
        sqlite3_free(zErrMsg);
}

void disk_sqlite3_iterator_cursor(const char *table, int cursor,
                                  const char *condition, func_va_t func, void *arg)
{
        __disk_iterator2(cursor, table, condition, func, arg);
}

void disk_sqlite3_close()
{
        int i;

        for (i = 0; i < __DB_HASH__; i++) {
                sqlite3_close(__disk__->db[i]);
        }

        yfree((void **)&__disk_sqlite3__[0]);
        yfree((void **)&__disk__);
}

static int __disk_sqlite3_cleanup__(int idx, const char *pool, const char *table)
{
        int ret;
        char sql[MAX_BUF_LEN];
        char *zErrMsg = NULL;

        snprintf(sql, MAX_BUF_LEN, "delete from %s where pool='%s'", table, pool);

        DWARN("idx %d sql %s\n", idx, sql);

        ret = sqlite3_exec(__disk__->db[idx], sql, NULL, 0, &zErrMsg);
        if (unlikely(ret != SQLITE_OK)){
                DERROR("SQL error: %s\n", zErrMsg);
                SERROR(0, "%s SQL error: %s\n", M_DISK_META_ERROR, zErrMsg);
                UNIMPLEMENTED(__DUMP__);
        }

        sqlite3_free(zErrMsg);

        return 0;
}

typedef struct {
        const char *pool;
        pthread_t th;
        int i;
        int ret;
} disk_cleanup_arg_t;

static void *__disk_cleanup_proc(void *_arg)
{
        int ret;
        disk_cleanup_arg_t *arg = _arg;

        DINFO("pool %s idx %d\n", arg->pool, arg->i);

        ret = __disk_sqlite3_cleanup__(arg->i, arg->pool, __metadata__);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __disk_sqlite3_cleanup__(arg->i, arg->pool, __raw__);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        DINFO("pool %s idx %d done\n", arg->pool, arg->i);

        arg->ret = 0;
        return NULL;
err_ret:
        arg->ret = ret;
        return NULL;
}

int disk_sqlite3_cleanup(const char *pool)
{
        int ret, i;
        disk_cleanup_arg_t args[__DB_HASH__];
        disk_cleanup_arg_t *arg;
        pthread_attr_t ta;
        void *res;

        (void) pthread_attr_init(&ta);

        for (i = 0; i < __DB_HASH__; i++) {
                arg = &args[i];
                arg->pool = pool;
                arg->i = i;
                arg->ret = 0;

                ret = pthread_create(&arg->th, &ta, __disk_cleanup_proc, arg);
                if (unlikely(ret)) {
                        UNIMPLEMENTED(__DUMP__);
                }
        }

        ret = pthread_attr_destroy(&ta);
        if (unlikely(ret)) {
                UNIMPLEMENTED(__DUMP__);
        }

        for (i = 0; i < __DB_HASH__; i++) {
                arg = &args[i];
                ret = pthread_join(arg->th, &res);
                if (unlikely(ret)) {
                        UNIMPLEMENTED(__DUMP__);
                }
        }

        for (i = 0; i < __DB_HASH__; i++) {
                arg = &args[i];
                if (arg->ret) {
                        GOTO(err_ret, ret);
                }
        }

        return 0;
err_ret:
        return ret;
}

typedef struct {
        void *arg;
        dm_itor_t func;
        uint64_t count;
} arg_new_t;


static int __disk_sqlite3_iterator_byvol_new__(va_list ap)
{
        diskloc_t loc;
        arg_new_t *arg = va_arg(ap, arg_new_t *);
        int idx = va_arg(ap, int);
        chkid_t *chkid = va_arg(ap, chkid_t *);
        uint32_t diskid = va_arg(ap, uint32_t);
        uint32_t offset = va_arg(ap, uint32_t);
        chkid_t *parent = va_arg(ap, chkid_t *);
        char *pool = va_arg(ap, char *);
        uint64_t meta_version = va_arg(ap, uint64_t);

        (void) idx;
        
        va_end(ap);

        arg->count++;
        loc.diskid = diskid;
        loc.idx = offset;

        DINFO("chunk "CHKID_FORMAT" pool %s disk(%u, %u) parent "CHKID_FORMAT" meta_version %u\n",
              CHKID_ARG(chkid), pool, diskid, offset, CHKID_ARG(parent), meta_version);
        return arg->func(chkid, pool, &loc, parent, meta_version, arg->arg);
}


static void __disk_sqlite3_iterator_byvol_new(const volid_t *volid, dm_itor_t func, void *_arg)
{
        int i;
        arg_new_t arg;
        uint64_t total;
        char raw_cdt[MAX_NAME_LEN], md_cdt[MAX_NAME_LEN], key[MAX_NAME_LEN];

        arg.arg = _arg;
        arg.func = func;

        base64_encode((void *)volid, sizeof(*volid), key);
        
        snprintf(raw_cdt, MAX_NAME_LEN, "parent='%s' limit %d", key, 100);

        //metadata
        snprintf(md_cdt, MAX_NAME_LEN, "key='%s' or parent='%s'", key, key);

        for (i = 0; i < __DB_HASH__; i++) {
                total = 0;
                while(1) {
                        arg.count = 0;
                        disk_sqlite3_iterator_cursor("raw", i, raw_cdt,
                                                     __disk_sqlite3_iterator_byvol_new__, &arg);
                        if  (arg.count == 0) {
                                DINFO("cleanup database %d.raw finish, total %ju\n", i, total);
                                break;
                        }

                        total += arg.count;
                }

                total = 0;
                while(1) {
                        disk_sqlite3_iterator_cursor("metadata", i, md_cdt,
                                                     __disk_sqlite3_iterator_byvol_new__, &arg);
                        if  (arg.count == 0) {
                                DINFO("cleanup database %d.metadata finish, total %ju\n", i, total);
                                break;
                        }

                        total += arg.count;
                }
        }
}

//

static void __disk_sqlite3_iterator_bydisk(int diskid, dm_itor_t func, void *_arg, int flag)
{
        int i;
        char condition[MAX_BUF_LEN];
        arg_new_t arg;

        snprintf(condition, MAX_BUF_LEN, "disk=%d", diskid);
        arg.arg = _arg;
        arg.func = func;
        
        for (i = 0; i < __DB_HASH__; i++) {
                if (flag & DM_FLAG_MD) {
                        disk_sqlite3_iterator_cursor("raw", i, condition,
                                                     __disk_sqlite3_iterator_byvol_new__, &arg);
                }

                if (flag & DM_FLAG_RAW) {
                        disk_sqlite3_iterator_cursor("metadata", i, condition,
                                                     __disk_sqlite3_iterator_byvol_new__, &arg);
                }
        }
}



int disk_sqlite3_load_batch(const chkid_t *chkid, int chknum, int *exists);

int disk_sqlite3_create_multi(const char *pool, const chkid_t *chkid,
                              const diskloc_t *loc, int chknum,
                              const chkid_t *parent,
                              uint64_t meta_version, int flag);

static disk_maping_t __disk_maping__ = {
        .init = disk_sqlite3_init,
        .close = disk_sqlite3_close,
        .create = disk_sqlite3_create_multi,
        .del = disk_sqlite3_del_request,
        .setparent = disk_sqlite3_setparent,
        .setloc = disk_sqlite3_setloc,
        .getmetaversion = disk_sqlite3_getmetaversion,
        .setmetaversion = disk_sqlite3_setmetaversion,
        .load = disk_sqlite3_load_request,
        .exist = disk_sqlite3_load_batch,
        .cleanup = disk_sqlite3_cleanup,
        .count = disk_sqlite3_chunk_count,
        .iterator_bydisk = __disk_sqlite3_iterator_bydisk,
        .iterator_byvol = __disk_sqlite3_iterator_byvol_new,
        .iterator = disk_sqlite3_iterator,
        .iterator_new = NULL,
};

disk_maping_t * disk_maping_sqlite3()
{
        return  &__disk_maping__; 
}
